Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feat: use user space buff to write binlog instead of Mmap #2848

Open
wants to merge 5 commits into
base: unstable
Choose a base branch
from

Conversation

cheniujh
Copy link
Collaborator

@cheniujh cheniujh commented Aug 6, 2024

更新:之前使用fwrite的实现的攒批并不完善,重新设计了一个自管理缓存的方案如下,已经排期,预计在其他优先级更高的工作完成之后会尽快进行重构,最终会以本PR提交代码:

Pika Binlog 攒批写方案:

简述:原本的Binlog用Mmap来写入,在写binlog的场景下会频繁触发页中断(Perf中也能看到PageFault的相应开销不小)。本方案预计使用一个自维护的用户态缓冲区来积攒Binlog,攒批写入,flush时机有两个,一时缓冲区写满了,二是每隔500ms检查一次,如果距离上次刷盘已经过去了900ms,就手动flush。另外在实现有锁版本之后,会继续对该缓冲区进行无锁优化

前置测试(NVME盘, Bechmark 10线程, Pika QPS达到上限):

  1. 关闭Binlog的情况下,Pika的最大QPS涨幅有30-40%

  2. 使用fwrite(512KB用户态缓存)简陋地实现了一版Binlog攒批写入,在Pika基于多实例的情况下,平均QPS涨幅能达到18%

  3. Perf可见do_page_fault调用占Binlog Put开销的大概一半,剩下一半则是锁开销(没错,Binlog也是个大锁)

上述测试说明,Binlog攒批写改造,是有收益的

阶段1:缓冲区实现细则

​ 维护一个可配置大小的、自维护(不使用fwrite,因为flush时需要运行一些逻辑,而fwrite没有提供回调接口)的缓冲区。每次flush都执行install offset操作,即更新Master内存中的最新binlog offst点位(只要同步进度低于该点位,Master就会不断将Binlog推送给slave,所以就在flush 缓冲区后来更新该点位 ),并且将其持久化。

阶段2:对该缓冲区做无锁优化

​ 在有锁版本完成之后,对缓冲区做无锁优化,一个直接的想法是维护一个原子变量来标志当前缓冲区的写入位置,每次有线程到达,都根据自己的BInlog Body大小对该原子量做加法(往后推移这个写入位置,相当于声明获取了一片内存),同时自己对做加法的范围进行写入,这样前一个线程还在写入时,后一个线程也可以同时对原子变量做加法来获取自己的缓冲区写入区域。另外还能想到的一个点是:这样并发写缓冲区可能很快就能写满,那么缓冲区本身的flush可能会成为瓶颈,于是可以考虑学RocksDB的mm和imm的概念,每次缓冲区写满直接switch一个新的出来继续提供写入,而写满的这份的就类似于imm,进行异步刷盘。

关键问题A:

一旦进程崩溃,存储在主节点上用户态缓冲区的Binlog数据都会丢失,后面从节点重连,会直接基于Binlog进行增量同步,但是在用户态缓冲区中的那部分Binlog实际没有在主节点上落盘,从节点消费不到这一部分Binlog,会导致主从不一致。((MySQL也有这个问嗯,其使用二阶段提交来进行解决,但是Pika对RocksDB需要保持无侵入,无法修改WAL,所以没法学这个二阶段提交)。针对这个问题的解决办法是利用持久化的flag机制结合binlog offset进行判断,在出现Binlog丢失时强制从节点采用全量同步来建连。

对于关键问题A的解决办法

每次缓冲区从空白转为非空时,持久化一个flag:binlog_buff_empty为false, 每次缓冲区刷盘后,将改flag改回true,该flag和持久化的binlog offst放在同一个文件即可(binlog目录下的manifest,使用mmap操作)。在每次启动Pika,先读取这个manifest中的binlog_buff_empty, 备份在内存中。如果有从节点试图增量建连,在常规逻辑之前加一个判断:binlog_buff_empty是否为false,如果是false,则说明主节点之前崩溃过,有Binlog丢失,则强制从节点全量建连。

此时会引出一个问题B:Pika运行期间,这个备份的flag始终是false的话,意味着所有从节点的建联都改为了强制全量,但如果有一些从节点全量建联一段时间之后因为网络波动断开,重新建联,此时也会强制全量同步,但实际上是允许这样的从节点进行增量同步的。

针对这个问题B的解法1是每次Pika启动时将从manifest中读取的binlog offset也读到内存中进行备份(记其为offset B)。当有从节点试图建立增量连接,先判断binlog_buff_empty是否为false,如是,再判断从节点携带的binlog offset同offset B进行对比,如果用户提供的binlog offset小于或等于offset B,说明该从节点从binlog消费逻辑来看,是应当要去消费丢失了的那部分binlog,此时强制其进行全量同步;另一个分支是,如果从节点的binlog offset大于offset B,则说明其之前已经通过全量同步建立过连接,属于断开以后再重连,则允许其建立增量同步。

当然,问题B还有一个解法2:当Pika启动时发现之前丢过binlog的话,直接抹掉自己的ReplicationID,这样后面的从节点连接上来就会报错,需要slaveof force执行强制建联(人工处理)。但是否要这样做还得搞清楚DBA们的操作逻辑,可以听取DBA的建议。

---------------------------------分割线------------------------------------------

本PR主要做了:binlog攒批写入改造

关联的另外一个问题:有用户反映在性能差的IO设备上,写Binlog会带来时延毛刺。

原代码使用Mmap一次映射1M空间来追加写入BInlog,Mmap虽然可以避免频繁系统调用,但实际上在追加写Binlog这种场景下,会不断触发页中断(这1M映射空间实际上是Copy On Write, 就算可能会做一点预分配,但也抵消不了binlog是一条一条写的劣势)具体地:Mmap映射到Page Cache,而写BInlog用到的每一个页都是新页(不在page cache中),实质上每次抵达页边界时memcpy都会触发页中断(目标页还未分配),并且大概率page Cache需要执行evict来为这个新页腾出空间, 而这又可能伴随着刷脏,而binlog是一条一条写入的又意味着其实前后两次刷脏之间还是有一会儿间隔(隔一会才能写满一个page), 那就意味着大概率没法将前后两次刷脏的page做IO合并。
最后造成的局面是:memcpy写了若干笔binlog,凑满了一个page,然后触发页中断,page cache刷脏一个页,然后过一会又重复一次。对块设备来说:每过一会就来一个小IO,小IO之间还有一定时间间隔,没法合并成大IO。哪怕撇开页中断本身的开销,小IO也发挥不出固态内部的并行性。在这种场景下,改成攒批写入的好处是:1 没有频繁页中断/切内核态 2 虽然也会经过page cache,也触发evict,但一次性会刷脏一批page,能合并成大IO。

本PR将Binlog攒批写入,根据测试,本PR在单RocksDB实例下,平均可以提高10%写QPS,多RocksDB实例(3实例)在大写入量下平均可以提高15-18%写QPS。

目前:使用带有512KB,用户态缓存的fwrite来写入binlog, 增加了定时任务,每500ms主动flush一次。

TODO:
1 将Pika的定时任务统一一下,将定时器线程挂靠到PikaSever, 归集分散的定时任务都到这个线程上去(如blpop过期扫描,binlog定时flushd)
2 进一步测试性能以及可靠性,将各种edge考虑进来,对收益和代价都要列清楚。

  • 重点考虑:引入了一个user space buf, 这里需要梳理清楚崩溃恢复的场景。目前Pika大多用于替代Redis做缓存,实际业务场景的要求是:掉电允许丢一部分数据,但主从要一致(主丢了的那部分数据从不能有)。 相较于原来的Mmap(缓存在Page Cache),这个PR的代价是在进程Crash的情况下有劣势,在掉电场景下倒是没有性质上的区别。
  • 如果DBA在掉电、进程crash的情况下将从的ReplicationID以及主的Binlog都删除掉,强制触发全量同步,本PR和原代码都能满足业务要求,即丢了少许数据,但主从能一致。

单独考虑Master进程Crash的场景(似乎还没有听到过这种情况,几率应该极其小),可以做一个如下逻辑:
每次flush完毕,持久化一个标志位binlog_flushed=true, 等下次再有写请求进来,往buf写数据,持久化这个binlog_flushed=false。也就是:如果有buf中的数据丢了,持久化的binlog_flushed会为false, 这种情况下Pika在Recovery时将自己的ReplicationID去掉,slave便无法增量建联,只能删除自己旧的replicationID再重新全量建联,这样就不会用到在buf中丢了一截数据的Binlog,通过全量能达到主从一致。
当然,从长期规划来说,把binlog改幂等,以后直接用binlog崩溃恢复,那这方面的问题就迎刃而解,但这个工作确实也不是那么好干的。

Summary by CodeRabbit

  • New Features

    • Introduced a new method for flushing buffered data to improve file I/O performance and data integrity.
    • Added functions for creating writable files with user-defined buffer sizes, enhancing file handling capabilities.
    • New BufferedWritableFile class provides improved error handling and robust file operations.
  • Bug Fixes

    • Enhanced resource management by ensuring proper stopping of timer tasks in the Binlog class.
  • Documentation

    • Updated comments and documentation to reflect new functionalities and improvements in file handling.

Copy link

coderabbitai bot commented Aug 6, 2024

Walkthrough

The recent changes enhance file handling and logging capabilities within the Binlog class, introducing new buffering methods and improved resource management. The addition of global constants and new file handling functions promotes efficient I/O operations, while the implementation of a retry mechanism for writes boosts robustness. Overall, these modifications aim to improve performance, data integrity, and error handling across various components.

Changes

Files Change Summary
include/pika_binlog.h Added FlushBufferedFile method and timer_task_thread_ member variable in the Binlog class.
include/pika_define.h Introduced new global constant FWRITE_USER_SPACE_BUF_SIZE set to 512KB.
src/pika_binlog.cc Enhanced file handling in Binlog by replacing file operations with buffered versions; added FlushBufferedFile and managed timer tasks in the constructor/destructor.
src/pstd/include/env.h Added NewBufferedWritableFile and BufferedAppendableFile functions to improve file handling capabilities.
src/pstd/src/env.cc Introduced BufferedWritableFile class and implemented methods for enhanced file operations, including error handling for appending data.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Binlog
    participant BufferedWritableFile

    User->>Binlog: Start logging
    Binlog->>BufferedWritableFile: Create buffered writable file
    BufferedWritableFile-->>Binlog: Return writable file
    Binlog->>BufferedWritableFile: Append data
    BufferedWritableFile-->>Binlog: Confirm append
    Binlog->>BufferedWritableFile: Flush buffered data
    BufferedWritableFile-->>Binlog: Confirm flush
    Binlog->>User: Logging successful
Loading

Poem

Hop along, oh bits and bytes,
We’ve made our log so bright!
With buffers full and files to write,
Data flows, a pure delight.
To the rhythm of the code, we bounce,
A happy rabbit’s joyful flounce! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added Invalid PR Title ✏️ Feature New feature or request labels Aug 6, 2024
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between a2acd88 and 4d3fa74.

Files selected for processing (5)
  • include/pika_binlog.h (3 hunks)
  • include/pika_define.h (1 hunks)
  • src/pika_binlog.cc (6 hunks)
  • src/pstd/include/env.h (2 hunks)
  • src/pstd/src/env.cc (3 hunks)
Files skipped from review due to trivial changes (1)
  • include/pika_define.h
Additional comments not posted (16)
include/pika_binlog.h (2)

16-16: Header inclusion approved.

The inclusion of dispatch_thread.h is necessary for the TimerTaskThread class.


80-80: Additions approved, ensure proper testing.

The FlushBufferedFile method and timer_task_thread_ member variable are appropriate additions for the new buffering mechanism. Ensure that the new functionality is thoroughly tested.

Also applies to: 113-114

src/pstd/include/env.h (2)

71-73: Addition of NewBufferedWritableFile approved.

The NewBufferedWritableFile function enhances file handling by allowing buffered writes, which can improve performance.


74-76: Addition of BufferedAppendableFile approved.

The BufferedAppendableFile function complements NewBufferedWritableFile by providing similar buffering capabilities for appending data, enhancing performance and flexibility.

src/pika_binlog.cc (7)

84-84: Use of NewBufferedWritableFile approved.

The use of NewBufferedWritableFile in the Binlog constructor is appropriate for implementing buffered writes, which can improve performance.


115-115: Use of BufferedAppendableFile approved.

The use of BufferedAppendableFile in the Binlog constructor is appropriate for implementing buffered writes for appending data, enhancing performance.


125-127: Addition of timer task for flushing buffer approved.

The addition of a timer task to flush the buffer every 500 milliseconds ensures that buffered data is periodically written to disk, improving data integrity and performance.


132-132: Stopping of timer task in destructor approved.

Stopping the timer task in the Binlog destructor ensures proper resource management and prevents potential memory leaks or dangling operations.


136-144: Addition of FlushBufferedFile method approved.

The FlushBufferedFile method is crucial for the new buffering mechanism, ensuring that buffered data is written to disk. The method acquires a lock before flushing to ensure thread safety.


227-227: Use of NewBufferedWritableFile in Put method approved.

The use of NewBufferedWritableFile in the Put method is appropriate for implementing buffered writes, which can improve performance.


404-404: Use of BufferedAppendableFile in SetProducerStatus method approved.

The use of BufferedAppendableFile in the SetProducerStatus method is appropriate for implementing buffered writes for appending data, enhancing performance.

Also applies to: 443-443

src/pstd/src/env.cc (5)

484-492: Constructor looks good.

The constructor initializes the class members correctly and asserts that the file pointer is not null.


543-548: Flush method looks good.

The Flush method correctly handles potential errors from fflush.


550-560: Sync method looks good.

The Sync method correctly handles potential errors from fsync.


562-562: Trim method looks good.

The Trim method correctly returns Status::OK as a no-op.


564-565: Filesize method looks good.

The Filesize method correctly returns the current file size.

Comment on lines +535 to +541
Status Close() override {
if (fclose(file_) != 0) {
return IOError("fclose failed: " + filename_, errno);
}
file_ = nullptr;
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors from fclose in Close method.

The Close method should handle potential errors from fclose to ensure that any issues during file closing are logged or managed.

-  if (fclose(file_) != 0) {
-    return IOError("fclose failed: " + filename_, errno);
-  }
+  if (fclose(file_) != 0) {
+    auto err = errno;
+    file_ = nullptr;
+    return IOError("fclose failed: " + filename_, err);
  }
  file_ = nullptr;
  return Status::OK();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status Close() override {
if (fclose(file_) != 0) {
return IOError("fclose failed: " + filename_, errno);
}
file_ = nullptr;
return Status::OK();
}
Status Close() override {
if (fclose(file_) != 0) {
auto err = errno;
file_ = nullptr;
return IOError("fclose failed: " + filename_, err);
}
file_ = nullptr;
return Status::OK();
}

Comment on lines +494 to +501
~BufferedWritableFile() override {
if (file_) {
BufferedWritableFile::Close();
}
if(buffer_) {
free(buffer_);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors from fclose in the destructor.

The destructor should handle potential errors from fclose to ensure that any issues during file closing are logged or managed.

-  if (file_) {
-    BufferedWritableFile::Close();
-  }
+  if (file_) {
+    auto s = BufferedWritableFile::Close();
+    if (!s.ok()) {
+      LOG(WARNING) << "Error closing file in destructor: " << s.ToString();
+    }
+  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
~BufferedWritableFile() override {
if (file_) {
BufferedWritableFile::Close();
}
if(buffer_) {
free(buffer_);
}
}
~BufferedWritableFile() override {
if (file_) {
auto s = BufferedWritableFile::Close();
if (!s.ok()) {
LOG(WARNING) << "Error closing file in destructor: " << s.ToString();
}
}
if(buffer_) {
free(buffer_);
}
}

Comment on lines +505 to +533
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;

while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}

return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve handling of ENOSPC and retry logic in Append method.

The Append method should handle ENOSPC more gracefully and ensure that retries are limited to avoid infinite loops.

-  if (errno == ENOSPC || ++retry_count > max_retries) {
+  if (errno == ENOSPC) {
+    return Status::Error("No space left on device: " + filename_);
+  }
+  if (++retry_count > max_retries) {
    return IOError(filename_, errno);
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;
while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}
return Status::OK();
}
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;
while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC) {
return Status::Error("No space left on device: " + filename_);
}
if (++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}
return Status::OK();
}

Comment on lines +743 to +785
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}

FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}

if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}

// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}

result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in BufferedAppendableFile function.

The function should handle errors from buffer allocation and fseek more gracefully.

-  if (user_space_buf_size_bytes != 0) {
-    buf = (char*)malloc(user_space_buf_size_bytes);
-    if (!buf) {
-      fclose(file);
-      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
-    }
-    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
-  } else {
-    r = setvbuf(file, nullptr, _IONBF, 0);
-  }
-  if (r != 0) {
-    fclose(file);
-    return IOError("Failed to set user space buffer for " + fname, errno);
-  }
+  if (user_space_buf_size_bytes != 0) {
+    buf = (char*)malloc(user_space_buf_size_bytes);
+    if (!buf) {
+      fclose(file);
+      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
+    }
+    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
+  } else {
+    r = setvbuf(file, nullptr, _IONBF, 0);
+  }
+  if (r != 0) {
+    free(buf);
+    fclose(file);
+    return IOError("Failed to set user space buffer for " + fname, errno);
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
free(buf);
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}

Comment on lines +787 to +824
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}

FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}

if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}

// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in NewBufferedWritableFile function.

The function should handle errors from buffer allocation more gracefully.

-  if (user_space_buf_size_bytes != 0) {
-    buf = (char*)malloc(user_space_buf_size_bytes);
-    if (!buf) {
-      fclose(file);
-      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
-    }
-    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
-  } else {
-    r = setvbuf(file, nullptr, _IONBF, 0);
-  }
-  if (r != 0) {
-    fclose(file);
-    return IOError("Failed to set user space buffer for " + fname, errno);
-  }
+  if (user_space_buf_size_bytes != 0) {
+    buf = (char*)malloc(user_space_buf_size_bytes);
+    if (!buf) {
+      fclose(file);
+      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
+    }
+    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
+  } else {
+    r = setvbuf(file, nullptr, _IONBF, 0);
+  }
+  if (r != 0) {
+    free(buf);
+    fclose(file);
+    return IOError("Failed to set user space buffer for " + fname, errno);
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
free(buf);
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}

@Mixficsol Mixficsol changed the title [WIP] feat: use buffered fwrite to write binlog instead of Mmap [WIP] use buffered fwrite to write binlog instead of Mmap Oct 12, 2024
@Mixficsol Mixficsol changed the title [WIP] use buffered fwrite to write binlog instead of Mmap wip: use buffered fwrite to write binlog instead of Mmap Oct 12, 2024
@Mixficsol Mixficsol changed the title wip: use buffered fwrite to write binlog instead of Mmap feat: use buffered fwrite to write binlog instead of Mmap Oct 12, 2024
@cheniujh cheniujh changed the base branch from unstable to ospp-refactor_binlog October 15, 2024 13:16
@cheniujh cheniujh changed the base branch from ospp-refactor_binlog to unstable October 15, 2024 13:17
@cheniujh cheniujh changed the title feat: use buffered fwrite to write binlog instead of Mmap [WIP]feat: use buffered fwrite to write binlog instead of Mmap Nov 6, 2024
@cheniujh cheniujh changed the title [WIP]feat: use buffered fwrite to write binlog instead of Mmap [WIP]feat: use user space buff to write binlog instead of Mmap Nov 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant